package org.apache.lucene.index;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.lucene.search.Query;
import org.apache.lucene.util.PriorityQueue;
import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit;

class CoalescedDeletes {
    final Map<Query, Integer> queries = new HashMap<Query, Integer>();
    final List<Iterable<Term>> iterables = new ArrayList<Iterable<Term>>();

    @Override
    public String toString() {
        // note: we could add/collect more debugging information
        return "CoalescedDeletes(termSets=" + iterables.size() + ",queries=" + queries.size() + ")";
    }

    void update(FrozenBufferedDeletes in) {
        iterables.add(in.termsIterable());

        for (int queryIdx = 0; queryIdx < in.queries.length; queryIdx++) {
            final Query query = in.queries[queryIdx];
            queries.put(query, BufferedDeletes.MAX_INT);
        }
    }

    public Iterable<Term> termsIterable() {
        return new Iterable<Term>() {
            public Iterator<Term> iterator() {
                ArrayList<Iterator<Term>> subs = new ArrayList<Iterator<Term>>(iterables.size());
                for (Iterable<Term> iterable : iterables) {
                    subs.add(iterable.iterator());
                }
                return mergedIterator(subs);
            }
        };
    }

    public Iterable<QueryAndLimit> queriesIterable() {
        return new Iterable<QueryAndLimit>() {

            public Iterator<QueryAndLimit> iterator() {
                return new Iterator<QueryAndLimit>() {
                    private final Iterator<Map.Entry<Query, Integer>> iter = queries.entrySet().iterator();

                    public boolean hasNext() {
                        return iter.hasNext();
                    }

                    public QueryAndLimit next() {
                        final Map.Entry<Query, Integer> ent = iter.next();
                        return new QueryAndLimit(ent.getKey(), ent.getValue());
                    }

                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    /** provides a merged view across multiple iterators */
    static Iterator<Term> mergedIterator(final List<Iterator<Term>> iterators) {
        return new Iterator<Term>() {
            Term current;
            TermMergeQueue queue = new TermMergeQueue(iterators.size());
            SubIterator[] top = new SubIterator[iterators.size()];
            int numTop;

            {
                int index = 0;
                for (Iterator<Term> iterator : iterators) {
                    if (iterator.hasNext()) {
                        SubIterator sub = new SubIterator();
                        sub.current = iterator.next();
                        sub.iterator = iterator;
                        sub.index = index++;
                        queue.add(sub);
                    }
                }
            }

            public boolean hasNext() {
                if (queue.size() > 0) {
                    return true;
                }

                for (int i = 0; i < numTop; i++) {
                    if (top[i].iterator.hasNext()) {
                        return true;
                    }
                }
                return false;
            }

            public Term next() {
                // restore queue
                pushTop();

                // gather equal top fields
                if (queue.size() > 0) {
                    pullTop();
                } else {
                    current = null;
                }
                return current;
            }

            public void remove() {
                throw new UnsupportedOperationException();
            }

            private void pullTop() {
                // extract all subs from the queue that have the same top term
                assert numTop == 0;
                while (true) {
                    top[numTop++] = queue.pop();
                    if (queue.size() == 0 || !(queue.top()).current.equals(top[0].current)) {
                        break;
                    }
                }
                current = top[0].current;
            }

            private void pushTop() {
                // call next() on each top, and put back into queue
                for (int i = 0; i < numTop; i++) {
                    if (top[i].iterator.hasNext()) {
                        top[i].current = top[i].iterator.next();
                        queue.add(top[i]);
                    } else {
                        // no more terms
                        top[i].current = null;
                    }
                }
                numTop = 0;
            }
        };
    }

    private static class SubIterator {
        Iterator<Term> iterator;
        Term current;
        int index;
    }

    private static class TermMergeQueue extends PriorityQueue<SubIterator> {
        TermMergeQueue(int size) {
            initialize(size);
        }

        @Override
        protected boolean lessThan(SubIterator a, SubIterator b) {
            final int cmp = a.current.compareTo(b.current);
            if (cmp != 0) {
                return cmp < 0;
            } else {
                return a.index < b.index;
            }
        }
    }
}
